Skip to main content

🛠️ Thực hành và tối ưu

· 25 min read

Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python

Bắt lỗi và log chi tiết SQL Server

Giới thiệu

Khi làm việc với hệ thống phân tích dữ liệu kết hợp Python và SQL Server, việc xử lý lỗi và ghi log chi tiết là một phần không thể thiếu để đảm bảo tính ổn định và khả năng bảo trì của ứng dụng. Một hệ thống ghi log được thiết kế tốt giúp phát hiện, phân tích và khắc phục lỗi nhanh chóng, đồng thời cung cấp thông tin quý giá về hiệu suất và hành vi của hệ thống. Bài viết này sẽ hướng dẫn chi tiết các kỹ thuật bắt lỗi và thiết lập hệ thống log hiệu quả khi thao tác với SQL Server từ Python.

1. Tổng quan về xử lý lỗi và logging trong Python

1.1. Các loại lỗi thường gặp khi làm việc với SQL Server

Khi thao tác với SQL Server từ Python, chúng ta thường gặp các loại lỗi sau:

  1. Lỗi kết nối: Không thể kết nối đến máy chủ SQL Server
  2. Lỗi xác thực: Sai thông tin đăng nhập
  3. Lỗi cú pháp SQL: Lỗi trong câu lệnh SQL
  4. Lỗi thời gian chờ: Truy vấn mất quá nhiều thời gian để thực thi
  5. Lỗi ràng buộc dữ liệu: Vi phạm các ràng buộc như khóa ngoại, giá trị duy nhất, v.v
  6. Lỗi chuyển đổi kiểu dữ liệu: Không thể chuyển đổi dữ liệu giữa Python và SQL Server
  7. Lỗi tài nguyên: Hết bộ nhớ, kết nối, v.v

1.2. Hệ thống log trong Python

Python cung cấp module logging tiêu chuẩn giúp ghi log với nhiều cấp độ khác nhau:

  • DEBUG: Thông tin chi tiết, thường dùng khi gỡ lỗi
  • INFO: Xác nhận mọi thứ đang hoạt động như mong đợi
  • WARNING: Chỉ ra rằng có điều gì đó không mong muốn xảy ra, nhưng ứng dụng vẫn hoạt động
  • ERROR: Do lỗi, ứng dụng không thể thực hiện một số chức năng
  • CRITICAL: Lỗi nghiêm trọng, ứng dụng có thể không tiếp tục hoạt động

2. Thiết lập cơ bản cho logging

2.1. Thiết lập logging cơ bản

import logging

# Cấu hình cơ bản
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='sql_operations.log',
filemode='a' # Append mode
)

# Tạo logger
logger = logging.getLogger('sql_server_operations')

2.2. Cấu hình handlers đa dạng

import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

def setup_logger(name, log_file, level=logging.INFO):
"""Thiết lập logger với file và console handlers"""

# Tạo thư mục logs nếu chưa tồn tại
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)

# Tạo và cấu hình logger
logger = logging.getLogger(name)
logger.setLevel(level)

# Ngăn log trùng lặp
if logger.handlers:
return logger

# Tạo file handler sử dụng RotatingFileHandler
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
file_handler.setLevel(level)

# Tạo console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)

# Tạo formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Thêm handlers vào logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

return logger

# Sử dụng
sql_logger = setup_logger(
'sql_server_operations',
os.path.join('logs', 'sql_operations.log')
)

2.3. Sử dụng TimedRotatingFileHandler để phân chia log theo thời gian

def setup_timed_logger(name, log_file, level=logging.INFO):
"""Thiết lập logger với TimedRotatingFileHandler để phân chia log theo ngày"""

logger = logging.getLogger(name)
logger.setLevel(level)

if logger.handlers:
return logger

# Tạo thư mục logs nếu chưa tồn tại
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)

# Tạo file handler sử dụng TimedRotatingFileHandler
# Phân chia file log mỗi ngày, giữ lại 30 file
file_handler = TimedRotatingFileHandler(
log_file, when='midnight', interval=1, backupCount=30
)
file_handler.setLevel(level)

# Tạo formatter bao gồm nhiều thông tin hơn
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

return logger

# Sử dụng
detailed_logger = setup_timed_logger(
'sql_detailed_operations',
os.path.join('logs', 'sql_operations_detailed.log')
)

3. Bắt và xử lý lỗi SQL Server

3.1. Xử lý lỗi cơ bản với try-except

import pyodbc
import logging

logger = logging.getLogger('sql_server_operations')

def execute_query(conn_string, query, params=None):
"""Thực thi truy vấn SQL với xử lý lỗi cơ bản"""

conn = None
cursor = None

try:
# Thiết lập kết nối
conn = pyodbc.connect(conn_string)
cursor = conn.cursor()

# Thực thi truy vấn
logger.info(f"Thực thi truy vấn: {query[:100]}...")

if params:
cursor.execute(query, params)
else:
cursor.execute(query)

# Commit nếu là truy vấn thay đổi dữ liệu
if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
conn.commit()
logger.info("Đã commit thay đổi")
return cursor.rowcount # Trả về số hàng bị ảnh hưởng

# Lấy kết quả nếu là truy vấn SELECT
results = cursor.fetchall()
logger.info(f"Truy vấn trả về {len(results)} kết quả")
return results

except pyodbc.Error as e:
if conn:
conn.rollback()
logger.error(f"Lỗi SQL: {str(e)}")
raise

except Exception as e:
if conn:
conn.rollback()
logger.error(f"Lỗi không xác định: {str(e)}")
raise

finally:
# Đảm bảo đóng cursor và connection
if cursor:
cursor.close()
if conn:
conn.close()
logger.debug("Đã đóng kết nối")

3.2. Xử lý lỗi chi tiết theo từng loại lỗi

import pyodbc
import time
import logging
from functools import wraps

logger = logging.getLogger('sql_detailed_operations')

# Định nghĩa các mã lỗi SQL Server phổ biến
SQL_TIMEOUT_ERROR = '08S01' # Timeout
SQL_CONNECTION_ERROR = '08001' # Không thể kết nối
SQL_CONSTRAINT_VIOLATION = '23000' # Vi phạm ràng buộc

def retry_on_connection_error(max_attempts=3, delay=2):
"""Decorator để thử lại khi gặp lỗi kết nối"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
last_exception = None

while attempts < max_attempts:
try:
return func(*args, **kwargs)
except pyodbc.Error as e:
error_code = e.args[0] if len(e.args) > 0 else "Unknown"

# Chỉ thử lại với lỗi kết nối
if error_code in (SQL_TIMEOUT_ERROR, SQL_CONNECTION_ERROR):
attempts += 1
wait_time = delay * attempts # Tăng thời gian chờ theo số lần thử

logger.warning(
f"Lỗi kết nối (mã: {error_code}). "
f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây. "
f"Chi tiết: {str(e)}"
)

time.sleep(wait_time)
last_exception = e
else:
# Với các lỗi khác thì ném ra ngay
raise

# Nếu đã thử hết số lần mà vẫn lỗi
logger.error(f"Đã thử {max_attempts} lần nhưng vẫn thất bại: {str(last_exception)}")
raise last_exception

return wrapper
return decorator


class SQLServerError(Exception):
"""Lớp cơ sở cho các lỗi SQL Server tùy chỉnh"""
def __init__(self, message, original_error=None, query=None, params=None):
self.message = message
self.original_error = original_error
self.query = query
self.params = params
super().__init__(self.message)


class SQLConnectionError(SQLServerError):
"""Lỗi kết nối đến SQL Server"""
pass


class SQLConstraintError(SQLServerError):
"""Lỗi vi phạm ràng buộc dữ liệu"""
pass


class SQLTimeoutError(SQLServerError):
"""Lỗi timeout khi thực thi truy vấn"""
pass


class SQLSyntaxError(SQLServerError):
"""Lỗi cú pháp SQL"""
pass


@retry_on_connection_error(max_attempts=3, delay=2)
def execute_query_advanced(conn_string, query, params=None, timeout=30):
"""Thực thi truy vấn SQL với xử lý lỗi chi tiết"""

conn = None
cursor = None
start_time = time.time()

try:
# Log thông tin truy vấn
if params:
masked_params = ['***' if i > 1 else str(p)[:10] for i, p in enumerate(params)]
logger.info(f"Thực thi truy vấn với tham số: {query[:100]}... - Params: {masked_params}")
else:
logger.info(f"Thực thi truy vấn: {query[:100]}...")

# Thiết lập kết nối
conn = pyodbc.connect(conn_string, timeout=timeout)
cursor = conn.cursor()

# Thực thi truy vấn
if params:
cursor.execute(query, params)
else:
cursor.execute(query)

# Đo thời gian thực thi
execution_time = time.time() - start_time

# Xác định loại truy vấn và xử lý kết quả phù hợp
query_type = query.strip().upper().split()[0] if query.strip() else ""

if query_type in ('INSERT', 'UPDATE', 'DELETE'):
conn.commit()
affected_rows = cursor.rowcount
logger.info(f"Đã commit thay đổi. {affected_rows} hàng bị ảnh hưởng. "
f"Thời gian thực thi: {execution_time:.3f}s")
return affected_rows

elif query_type == 'SELECT':
columns = [column[0] for column in cursor.description]
results = cursor.fetchall()
row_count = len(results)

logger.info(f"Truy vấn SELECT thành công. Trả về {row_count} kết quả. "
f"Thời gian thực thi: {execution_time:.3f}s")

# Log mẫu dữ liệu (chỉ log vài hàng đầu tiên để tránh quá tải)
if row_count > 0 and logger.level <= logging.DEBUG:
sample_data = str(results[0])
if len(sample_data) > 200:
sample_data = sample_data[:200] + "..."
logger.debug(f"Mẫu dữ liệu: {sample_data}")

# Trả về kết quả dưới dạng list of dict để dễ sử dụng
return [dict(zip(columns, row)) for row in results]

else:
conn.commit()
logger.info(f"Đã thực thi truy vấn. Thời gian thực thi: {execution_time:.3f}s")
return True

except pyodbc.Error as e:
# Rollback transaction nếu có lỗi
if conn:
try:
conn.rollback()
logger.info("Đã rollback transaction")
except Exception:
pass

# Xác định mã lỗi
error_code = e.args[0] if len(e.args) > 0 else "Unknown"
error_message = str(e)

# Ghi log và ném ra exception tùy chỉnh tương ứng
if error_code == SQL_CONNECTION_ERROR:
logger.error(f"Lỗi kết nối SQL Server: {error_message}")
raise SQLConnectionError("Không thể kết nối đến SQL Server", e, query, params)

elif error_code == SQL_TIMEOUT_ERROR:
logger.error(f"Lỗi timeout SQL: {error_message}")
raise SQLTimeoutError("Truy vấn bị timeout", e, query, params)

elif error_code == SQL_CONSTRAINT_VIOLATION:
logger.error(f"Lỗi vi phạm ràng buộc dữ liệu: {error_message}")
raise SQLConstraintError("Vi phạm ràng buộc dữ liệu", e, query, params)

elif 'syntax' in error_message.lower():
logger.error(f"Lỗi cú pháp SQL: {error_message}")
raise SQLSyntaxError("Lỗi cú pháp trong truy vấn SQL", e, query, params)

else:
logger.error(f"Lỗi SQL không xác định (mã: {error_code}): {error_message}")
raise SQLServerError(f"Lỗi SQL Server: {error_message}", e, query, params)

except Exception as e:
# Xử lý các lỗi khác không phải từ SQL Server
if conn:
try:
conn.rollback()
logger.info("Đã rollback transaction")
except Exception:
pass

logger.error(f"Lỗi không xác định: {str(e)}", exc_info=True)
raise

finally:
# Đảm bảo đóng cursor và connection
if cursor:
cursor.close()
if conn:
conn.close()
logger.debug("Đã đóng kết nối DB")

4. Thiết kế hệ thống log toàn diện

4.1. Tạo lớp Logger tùy chỉnh

import logging
import os
import json
import traceback
import socket
from datetime import datetime
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler


class SQLLogger:
"""Lớp logger tùy chỉnh cho các thao tác với SQL Server"""

def __init__(self, app_name, log_dir='logs', log_level=logging.INFO):
"""Khởi tạo hệ thống log"""
self.app_name = app_name
self.log_dir = log_dir
self.log_level = log_level
self.hostname = socket.gethostname()

# Tạo thư mục logs nếu chưa tồn tại
if not os.path.exists(log_dir):
os.makedirs(log_dir)

# Thiết lập các logger
self.setup_loggers()

def setup_loggers(self):
"""Thiết lập các logger khác nhau cho từng mục đích"""

# Logger cho các thao tác SQL thông thường
self.sql_logger = self._create_logger(
'sql_operations',
os.path.join(self.log_dir, 'sql_operations.log'),
self.log_level
)

# Logger chi tiết cho lỗi
self.error_logger = self._create_logger(
'sql_errors',
os.path.join(self.log_dir, 'sql_errors.log'),
logging.ERROR
)

# Logger cho các truy vấn chậm
self.slow_query_logger = self._create_logger(
'slow_queries',
os.path.join(self.log_dir, 'slow_queries.log'),
logging.WARNING
)

def _create_logger(self, name, log_file, level):
"""Tạo logger với file handler và formatter"""

# Tạo logger với tên ứng dụng làm prefix
logger_name = f"{self.app_name}.{name}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)

# Ngăn log trùng lặp
if logger.handlers:
return logger

# Tạo file handler với rotation
file_handler = TimedRotatingFileHandler(
log_file, when='midnight', interval=1, backupCount=30
)
file_handler.setLevel(level)

# Tạo formatter chi tiết
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(name)s - [%(hostname)s] - '
'%(pathname)s:%(lineno)d - %(message)s'
)

# Thêm thông tin hostname vào formatter
old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.hostname = self.hostname
return record

logging.setLogRecordFactory(record_factory)

file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

return logger

def log_query(self, query, params=None, duration=None, result_count=None):
"""Ghi log truy vấn SQL"""

# Tạo thông tin log
log_data = {
'query': query[:500] + ('...' if len(query) > 500 else ''),
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname
}

# Thêm params nếu có (che dấu thông tin nhạy cảm)
if params:
masked_params = []
for p in params:
if isinstance(p, str) and len(p) > 10:
masked_params.append(p[:5] + '...' + p[-2:])
else:
masked_params.append(p)
log_data['params'] = masked_params

# Thêm thời gian thực thi nếu có
if duration:
log_data['duration'] = f"{duration:.3f}s"

# Log truy vấn chậm (>1s) vào logger riêng
if duration > 1.0:
self.slow_query_logger.warning(
f"Truy vấn chậm: {log_data['query']} - "
f"Thời gian: {log_data['duration']}"
)

# Thêm số lượng kết quả nếu có
if result_count is not None:
log_data['result_count'] = result_count

# Ghi log thông thường
self.sql_logger.info(f"SQL Query: {json.dumps(log_data)}")

return log_data

def log_error(self, error, query=None, params=None, context=None):
"""Ghi log lỗi SQL với thông tin chi tiết"""

error_type = type(error).__name__
error_message = str(error)
stack_trace = traceback.format_exc()

# Tạo thông tin log
error_data = {
'error_type': error_type,
'error_message': error_message,
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname,
'stack_trace': stack_trace
}

# Thêm query nếu có
if query:
error_data['query'] = query[:500] + ('...' if len(query) > 500 else '')

# Thêm params nếu có (che dấu thông tin nhạy cảm)
if params:
masked_params = []
for p in params:
if isinstance(p, str) and len(p) > 10:
masked_params.append(p[:5] + '...' + p[-2:])
else:
masked_params.append(p)
error_data['params'] = masked_params

# Thêm context nếu có
if context:
error_data['context'] = context

# Ghi log lỗi
self.error_logger.error(f"SQL Error: {json.dumps(error_data)}")

# Đồng thời ghi log thông thường
self.sql_logger.error(
f"SQL Error: {error_type} - {error_message}"
)

return error_data

def log_transaction(self, action, affected_rows=None, duration=None):
"""Ghi log các thao tác transaction"""

log_data = {
'action': action,
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname
}

if affected_rows is not None:
log_data['affected_rows'] = affected_rows

if duration:
log_data['duration'] = f"{duration:.3f}s"

self.sql_logger.info(f"Transaction: {json.dumps(log_data)}")

return log_data

4.2. Tích hợp logger tùy chỉnh với thao tác SQL

import pyodbc
import time
from contextlib import contextmanager

class SQLServerDatabase:
"""Lớp quản lý kết nối và thao tác với SQL Server kèm logging"""

def __init__(self, conn_string, app_name="SQLApp", log_dir="logs"):
"""Khởi tạo với chuỗi kết nối và thiết lập logger"""
self.conn_string = conn_string
self.logger = SQLLogger(app_name, log_dir)

@contextmanager
def connection(self):
"""Context manager để quản lý kết nối tự động đóng"""
conn = None
try:
conn = pyodbc.connect(self.conn_string)
yield conn
except pyodbc.Error as e:
self.logger.log_error(e, context="establishing connection")
raise
finally:
if conn:
conn.close()

@contextmanager
def transaction(self):
"""Context manager để quản lý transaction"""
with self.connection() as conn:
try:
# Bắt đầu transaction
start_time = time.time()
self.logger.log_transaction("START")

yield conn

# Commit transaction nếu không có lỗi
conn.commit()
duration = time.time() - start_time
self.logger.log_transaction("COMMIT", duration=duration)

except Exception as e:
# Rollback transaction nếu có lỗi
conn.rollback()
duration = time.time() - start_time
self.logger.log_transaction("ROLLBACK", duration=duration)

# Log lỗi
self.logger.log_error(e, context="transaction")
raise

def execute_query(self, query, params=None):
"""Thực thi truy vấn và trả về kết quả"""
with self.connection() as conn:
try:
start_time = time.time()
cursor = conn.cursor()

# Thực thi truy vấn
if params:
cursor.execute(query, params)
else:
cursor.execute(query)

# Lấy kết quả nếu là truy vấn SELECT
if query.strip().upper().startswith("SELECT"):
columns = [column[0] for column in cursor.description]
results = cursor.fetchall()
result_count = len(results)

# Tính thời gian thực thi
duration = time.time() - start_time

# Log truy vấn
self.logger.log_query(
query, params, duration=duration, result_count=result_count
)

# Trả về kết quả dưới dạng list of dict
return [dict(zip(columns, row)) for row in results]
else:
# Đối với các truy vấn thay đổi dữ liệu
affected_rows = cursor.rowcount

# Tính thời gian thực thi
duration = time.time() - start_time

# Log truy vấn
self.logger.log_query(
query, params, duration=duration, result_count=affected_rows
)

# Commit thay đổi
conn.commit()

# Trả về số hàng bị ảnh hưởng
return affected_rows

except Exception as e:
# Log lỗi
self.logger.log_error(e, query, params)
raise

def execute_many(self, query, params_list):
"""Thực thi nhiều truy vấn với danh sách tham số"""
with self.transaction() as conn:
try:
start_time = time.time()
cursor = conn.cursor()

# Thực thi executemany
cursor.executemany(query, params_list)

# Tính thời gian thực thi
duration = time.time() - start_time

# Lấy số hàng bị ảnh hưởng
affected_rows = cursor.rowcount

# Log thông tin
self.logger.log_query(
query,
f"[{len(params_list)} parameter sets]",
duration=duration,
result_count=affected_rows
)

return affected_rows

except Exception as e:
# Log lỗi
self.logger.log_error(e, query, f"[{len(params_list)} parameter sets]")
raise

def bulk_insert(self, table_name, data, batch_size=1000):
"""Thực hiện bulk insert với logging chi tiết"""
if not data:
return 0

total_rows = len(data)
total_batches = (total_rows + batch_size - 1) // batch_size
total_inserted = 0
start_total_time = time.time()

self.logger.sql_logger.info(
f"Bắt đầu bulk insert vào bảng {table_name}. "
f"{total_rows} hàng, {total_batches} batch(es)"
)

try:
# Lấy tên các cột từ dữ liệu
if isinstance(data[0], dict):
columns = list(data[0].keys())
# Chuyển dữ liệu từ dict sang list
data_values = [[row[col] for col in columns] for row in data]
else:
raise ValueError("Data phải là list of dict")

# Xây dựng câu truy vấn insert
placeholders = ','.join('?' for _ in columns)
column_names = ','.join(columns)
query = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})"

# Thực hiện insert theo batch
for i in range(0, total_rows, batch_size):
batch_start_time = time.time()

# Lấy một batch dữ liệu
batch = data_values[i:i+batch_size]
batch_size_actual = len(batch)

# Thực hiện insert
with self.transaction() as conn:
cursor = conn.cursor()
cursor.executemany(query, batch)
batch_affected = cursor.rowcount

# Tính thời gian và log
batch_duration = time.time() - batch_start_time
total_inserted += batch_affected

self.logger.sql_logger.info(
f"Batch {(i//batch_size)+1}/{total_batches}: "
f"Đã insert {batch_affected}/{batch_size_actual} hàng "
f"trong {batch_duration:.3f}s"
)

# Log tổng kết
total_duration = time.time() - start_total_time
self.logger.sql_logger.info(
f"Hoàn thành bulk insert vào bảng {table_name}. "
f"Tổng số: {total_inserted}/{total_rows} hàng "
f"trong {total_duration:.3f}s"
)

return total_inserted

except Exception as e:
# Log lỗi
self.logger.log_error(
e,
context=f"bulk_insert to {table_name}, {total_rows} rows"
)
raise

5. Ứng dụng thực tế

5.1. Ví dụ sử dụng lớp Database với xử lý lỗi

# Ví dụ sử dụng lớp Database để thao tác với SQL Server
conn_string = 'DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password'

# Khởi tạo đối tượng Database
db = SQLServerDatabase(conn_string, app_name="SalesAnalytics", log_dir="logs/sales")

try:
# Truy vấn đơn giản
results = db.execute_query(
"SELECT TOP 10 * FROM DuLieuBanHang WHERE NgayBan >= ?",
['2024-01-01']
)
print(f"Lấy được {len(results)} kết quả")

# Thực hiện insert với transaction
with db.transaction() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO DuLieuBanHang (NgayBan, MaSanPham, SoLuong, DoanhThu)
VALUES (?, ?, ?, ?)
""", ['2024-05-01', 'SP001', 5, 1500000])

# Có thể thực hiện nhiều lệnh trong cùng một transaction
cursor.execute("""
UPDATE TongKetDoanhThu
SET TongDoanhThu = TongDoanhThu + ?
WHERE Thang = 5 AND Nam = 2024
""", [1500000])

# Bulk insert dữ liệu
sales_data = [
{
'NgayBan': '2024-05-01',
'MaSanPham': f'SP{i:03d}',
'SoLuong': i % 10 + 1,
'DoanhThu': (i % 10 + 1) * 300000
}
for i in range(1, 101)
]

inserted = db.bulk_insert('DuLieuBanHang', sales_data, batch_size=20)
print(f"Đã insert {inserted} hàng dữ liệu")

except SQLConnectionError as e:
print(f"Lỗi kết nối: {e.message}")
# Thử kết nối lại hoặc thông báo cho người dùng

except SQLConstraintError as e:
print(f"Lỗi ràng buộc dữ liệu: {e.message}")
# Kiểm tra và sửa dữ liệu

except SQLTimeoutError as e:
print(f"Truy vấn bị timeout: {e.message}")
# Tối ưu truy vấn hoặc tăng timeout

except SQLSyntaxError as e:
print(f"Lỗi cú pháp SQL: {e.message}")
# Sửa lỗi cú pháp trong truy vấn

except SQLServerError as e:
print(f"Lỗi SQL Server: {e.message}")
# Xử lý lỗi chung từ SQL Server

except Exception as e:
print(f"Lỗi không xác định: {str(e)}")
# Ghi log và thông báo lỗi chung

5.2. Xử lý lỗi khi làm việc với pandas và SQLAlchemy

import pandas as pd
from sqlalchemy import create_engine, text
import urllib
import logging

# Thiết lập logger
logger = logging.getLogger('pandas_sql')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('logs/pandas_sql.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)

def create_sql_engine(server, database, username, password):
"""Tạo SQLAlchemy engine với xử lý lỗi"""
try:
# Tạo chuỗi kết nối
params = urllib.parse.quote_plus(
f"DRIVER={{SQL Server}};SERVER={server};"
f"DATABASE={database};UID={username};PWD={password}"
)

# Tạo engine với cấu hình
engine = create_engine(
f"mssql+pyodbc:///?odbc_connect={params}",
pool_pre_ping=True, # Kiểm tra kết nối trước khi sử dụng
pool_recycle=3600, # Làm mới kết nối sau 1 giờ
connect_args={'timeout': 30} # Timeout kết nối là 30 giây
)

# Kiểm tra kết nối
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
if result.scalar() == 1:
logger.info(f"Kết nối thành công đến {server}/{database}")
return engine
else:
raise Exception("Kiểm tra kết nối không thành công")

except Exception as e:
logger.error(f"Lỗi tạo kết nối SQL: {str(e)}")
raise

def read_sql_with_logging(query, engine, params=None):
"""Đọc dữ liệu từ SQL với pandas và log"""
try:
start_time = time.time()
logger.info(f"Bắt đầu đọc dữ liệu với query: {query[:200]}...")

# Thực hiện truy vấn
if params:
df = pd.read_sql(query, engine, params=params)
else:
df = pd.read_sql(query, engine)

# Log kết quả
duration = time.time() - start_time
row_count = len(df)
col_count = len(df.columns)

logger.info(
f"Hoàn thành đọc dữ liệu: {row_count} hàng × {col_count} cột "
f"trong {duration:.3f}s"
)

# Log thông tin về bộ nhớ sử dụng
memory_usage = df.memory_usage(deep=True).sum()
logger.info(f"Bộ nhớ sử dụng: {memory_usage/1024/1024:.2f} MB")

return df

except Exception as e:
logger.error(f"Lỗi đọc dữ liệu SQL: {str(e)}")
raise

def write_to_sql_with_logging(df, table_name, engine, if_exists='replace', chunksize=1000):
"""Ghi DataFrame vào SQL Server với logging"""
try:
start_time = time.time()
row_count = len(df)
logger.info(
f"Bắt đầu ghi {row_count} hàng vào bảng {table_name}, "
f"chế độ: {if_exists}, kích thước chunk: {chunksize}"
)

# Thực hiện ghi dữ liệu
df.to_sql(
table_name,
engine,
if_exists=if_exists,
chunksize=chunksize,
index=False
)

# Log kết quả
duration = time.time() - start_time
logger.info(
f"Hoàn thành ghi dữ liệu vào bảng {table_name}: "
f"{row_count} hàng trong {duration:.3f}s"
)

return True

except Exception as e:
logger.error(f"Lỗi ghi dữ liệu vào SQL: {str(e)}")
raise

5.3. Tích hợp với hệ thống giám sát

import requests
import socket
import json
import traceback
from datetime import datetime

class SQLMonitor:
"""Lớp giám sát SQL Server và gửi thông báo khi có lỗi"""

def __init__(self, webhook_url=None, email_config=None):
"""Khởi tạo với URL webhook và cấu hình email"""
self.webhook_url = webhook_url
self.email_config = email_config
self.hostname = socket.gethostname()
self.logger = logging.getLogger('sql_monitor')

# Thiết lập handler cho logger
handler = logging.FileHandler('logs/sql_monitor.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

def log_and_alert(self, error, query=None, context=None, alert_level='warning'):
"""Ghi log lỗi và gửi thông báo"""
# Tạo thông tin lỗi
error_data = {
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname,
'error_type': type(error).__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc(),
'alert_level': alert_level
}

if query:
error_data['query'] = query

if context:
error_data['context'] = context

# Ghi log
if alert_level == 'critical':
self.logger.critical(f"SQL Critical Error: {json.dumps(error_data)}")
elif alert_level == 'error':
self.logger.error(f"SQL Error: {json.dumps(error_data)}")
else:
self.logger.warning(f"SQL Warning: {json.dumps(error_data)}")

# Gửi thông báo
if alert_level in ('error', 'critical'):
self._send_alert(error_data)

def _send_alert(self, error_data):
"""Gửi thông báo lỗi qua webhook và/hoặc email"""

# Gửi qua webhook (ví dụ: Slack, Teams, etc.)
if self.webhook_url:
try:
# Định dạng thông báo
message = {
'text': f"SQL Error on {error_data['hostname']}",
'attachments': [{
'title': f"{error_data['error_type']}: {error_data['error_message']}",
'text': f"Context: {error_data.get('context', 'N/A')}\n"
f"Query: {error_data.get('query', 'N/A')}\n"
f"Time: {error_data['timestamp']}",
'color': 'danger' if error_data['alert_level'] == 'critical' else 'warning'
}]
}

# Gửi request
response = requests.post(
self.webhook_url,
json=message,
timeout=5
)

if response.status_code == 200:
self.logger.info("Đã gửi thông báo lỗi qua webhook")
else:
self.logger.warning(
f"Không thể gửi thông báo qua webhook. "
f"Status code: {response.status_code}"
)

except Exception as e:
self.logger.error(f"Lỗi khi gửi thông báo webhook: {str(e)}")

# Gửi qua email
if self.email_config:
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Tạo email
msg = MIMEMultipart()
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
msg['Subject'] = f"SQL Error on {error_data['hostname']}: {error_data['error_type']}"

# Tạo nội dung
body = f"""
<h2>SQL Error Details</h2>
<p><strong>Time:</strong> {error_data['timestamp']}</p>
<p><strong>Host:</strong> {error_data['hostname']}</p>
<p><strong>Error Type:</strong> {error_data['error_type']}</p>
<p><strong>Error Message:</strong> {error_data['error_message']}</p>

<h3>Context</h3>
<p>{error_data.get('context', 'N/A')}</p>

<h3>Query</h3>
<pre>{error_data.get('query', 'N/A')}</pre>

<h3>Stack Trace</h3>
<pre>{error_data['stack_trace']}</pre>
"""

msg.attach(MIMEText(body, 'html'))

# Gửi email
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()

self.logger.info("Đã gửi thông báo lỗi qua email")

except Exception as e:
self.logger.error(f"Lỗi khi gửi email thông báo: {str(e)}")

5.4. Hệ thống theo dõi hiệu suất truy vấn SQL

class SQLPerformanceTracker:
"""Lớp theo dõi hiệu suất truy vấn SQL"""

def __init__(self, log_dir='logs/performance'):
"""Khởi tạo với thư mục log"""
self.log_dir = log_dir

# Tạo thư mục nếu chưa tồn tại
if not os.path.exists(log_dir):
os.makedirs(log_dir)

# Thiết lập logger
self.logger = logging.getLogger('sql_performance')
handler = logging.FileHandler(os.path.join(log_dir, 'performance.log'))
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

# Thống kê hiệu suất
self.stats = {
'total_queries': 0,
'total_duration': 0,
'max_duration': 0,
'slow_queries': 0,
'error_queries': 0,
'query_types': {
'SELECT': 0,
'INSERT': 0,
'UPDATE': 0,
'DELETE': 0,
'OTHER': 0
}
}

def track_query(self, query, duration, result_count=None, error=None):
"""Theo dõi một truy vấn SQL"""
try:
# Cập nhật thống kê
self.stats['total_queries'] += 1
self.stats['total_duration'] += duration
self.stats['max_duration'] = max(self.stats['max_duration'], duration)

if duration > 1.0: # Truy vấn chậm > 1 giây
self.stats['slow_queries'] += 1

if error:
self.stats['error_queries'] += 1

# Xác định loại truy vấn
first_word = query.strip().upper().split()[0] if query.strip() else "OTHER"
if first_word in self.stats['query_types']:
self.stats['query_types'][first_word] += 1
else:
self.stats['query_types']['OTHER'] += 1

# Log thông tin truy vấn
log_entry = {
'timestamp': datetime.now().isoformat(),
'query_type': first_word,
'duration': duration,
'result_count': result_count,
'has_error': error is not None,
'query_preview': query[:100] + ('...' if len(query) > 100 else '')
}

self.logger.info(f"Query Stats: {json.dumps(log_entry)}")

# Log chi tiết cho truy vấn chậm
if duration > 1.0:
slow_log_entry = {
'timestamp': datetime.now().isoformat(),
'duration': duration,
'query': query,
'result_count': result_count
}

with open(os.path.join(self.log_dir, 'slow_queries.log'), 'a') as f:
f.write(f"{json.dumps(slow_log_entry)}\n")

return log_entry

except Exception as e:
print(f"Lỗi khi theo dõi truy vấn: {str(e)}")

def get_statistics(self):
"""Lấy thống kê hiệu suất"""
stats = self.stats.copy()

# Tính thời gian trung bình
if stats['total_queries'] > 0:
stats['avg_duration'] = stats['total_duration'] / stats['total_queries']
else:
stats['avg_duration'] = 0

return stats

def generate_report(self, output_file=None):
"""Tạo báo cáo hiệu suất"""
stats = self.get_statistics()

report = f"""
SQL Performance Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
=============================================================

Summary:
--------
Total Queries: {stats['total_queries']}
Average Duration: {stats['avg_duration']:.6f} seconds
Maximum Duration: {stats['max_duration']:.6f} seconds
Slow Queries (>1s): {stats['slow_queries']} ({stats['slow_queries']/max(1, stats['total_queries'])*100:.2f}%)
Error Queries: {stats['error_queries']} ({stats['error_queries']/max(1, stats['total_queries'])*100:.2f}%)

Query Types:
------------
SELECT: {stats['query_types']['SELECT']} ({stats['query_types']['SELECT']/max(1, stats['total_queries'])*100:.2f}%)
INSERT: {stats['query_types']['INSERT']} ({stats['query_types']['INSERT']/max(1, stats['total_queries'])*100:.2f}%)
UPDATE: {stats['query_types']['UPDATE']} ({stats['query_types']['UPDATE']/max(1, stats['total_queries'])*100:.2f}%)
DELETE: {stats['query_types']['DELETE']} ({stats['query_types']['DELETE']/max(1, stats['total_queries'])*100:.2f}%)
OTHER: {stats['query_types']['OTHER']} ({stats['query_types']['OTHER']/max(1, stats['total_queries'])*100:.2f}%)
"""

if output_file:
with open(output_file, 'w') as f:
f.write(report)

return report

6. Tích hợp vào hệ thống trực tiếp

6.1. Tạo decorators cho xử lý lỗi tự động

from functools import wraps
import time
import traceback

def log_sql_operation(logger):
"""Decorator để log các thao tác SQL"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Lấy tên hàm
func_name = func.__name__
start_time = time.time()

try:
# Thực thi hàm
logger.info(f"Bắt đầu thực thi {func_name}")
result = func(*args, **kwargs)

# Tính thời gian thực thi
duration = time.time() - start_time

# Log kết quả thành công
logger.info(f"Thực thi {func_name} thành công trong {duration:.3f}s")

return result

except Exception as e:
# Tính thời gian thực thi
duration = time.time() - start_time

# Log lỗi
error_type = type(e).__name__
error_message = str(e)
stack_trace = traceback.format_exc()

logger.error(
f"Lỗi khi thực thi {func_name}: {error_type} - {error_message}\n"
f"Thời gian: {duration:.3f}s\n"
f"Stack trace: {stack_trace}"
)

# Ném lại exception
raise

return wrapper
return decorator

def retry_on_specific_errors(max_attempts=3, delay=2, error_types=(Exception,)):
"""Decorator để thử lại khi gặp lỗi cụ thể"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
last_exception = None

while attempts < max_attempts:
try:
return func(*args, **kwargs)
except error_types as e:
attempts += 1
last_exception = e

# Tăng thời gian chờ theo số lần thử
wait_time = delay * attempts

if attempts < max_attempts:
logging.warning(
f"Lỗi khi thực thi {func.__name__}: {str(e)}\n"
f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây"
)
time.sleep(wait_time)
else:
logging.error(
f"Đã thử lại {max_attempts} lần nhưng vẫn thất bại: {str(e)}"
)

# Nếu đã thử hết số lần mà vẫn lỗi
raise last_exception

return wrapper
return decorator

6.2. Tích hợp với FastAPI hoặc Flask

from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import logging
import time

# Thiết lập logger
logger = logging.getLogger("api_sql_operations")
logger.setLevel(logging.INFO)
handler = logging.FileHandler("logs/api_sql.log")
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)

# Khởi tạo FastAPI app
app = FastAPI(title="SQL Operations API")

# Khởi tạo đối tượng Database
db = SQLServerDatabase(
conn_string='DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password',
app_name="API_Service",
log_dir="logs/api"
)

# Model cho request
class DataQuery(BaseModel):
query: str
params: list = None

# Middleware để log request và response
@app.middleware("http")
async def log_requests(request, call_next):
start_time = time.time()

# Log request
logger.info(f"Request: {request.method} {request.url}")

try:
# Xử lý request
response = await call_next(request)

# Tính thời gian xử lý
duration = time.time() - start_time

# Log response
logger.info(f"Response: {response.status_code} in {duration:.3f}s")

return response

except Exception as e:
# Log lỗi
duration = time.time() - start_time
logger.error(f"Error: {str(e)} in {duration:.3f}s")

# Trả về lỗi 500
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"}
)

# Endpoint để thực thi truy vấn SELECT
@app.post("/query/select")
async def execute_select(query_data: DataQuery):
try:
# Log truy vấn
logger.info(f"Executing SELECT query: {query_data.query[:100]}...")

# Kiểm tra xem có phải truy vấn SELECT không
if not query_data.query.strip().upper().startswith("SELECT"):
raise HTTPException(
status_code=400,
detail="Only SELECT queries are allowed for this endpoint"
)

# Thực thi truy vấn
start_time = time.time()
results = db.execute_query(query_data.query, query_data.params)
duration = time.time() - start_time

# Log kết quả
logger.info(f"Query executed in {duration:.3f}s, returned {len(results)} rows")

return {
"success": True,
"duration": duration,
"row_count": len(results),
"results": results
}

except SQLServerError as e:
# Log lỗi
logger.error(f"SQL Error: {e.message}")

# Trả về lỗi
raise HTTPException(
status_code=400,
detail=f"SQL Error: {e.message}"
)

except Exception as e:
# Log lỗi không xác định
logger.error(f"Unexpected error: {str(e)}")

# Trả về lỗi 500
raise HTTPException(
status_code=500,
detail="Internal server error"
)

Kết luận

Bắt lỗi và log chi tiết SQL Server

Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python là một phần không thể thiếu trong việc xây dựng các ứng dụng dữ liệu chuyên nghiệp, đáng tin cậy. Một hệ thống xử lý lỗi và ghi log tốt không chỉ giúp phát hiện và khắc phục sự cố nhanh chóng mà còn cung cấp thông tin quý giá để tối ưu hóa hiệu suất và độ tin cậy của hệ thống.

Các nguyên tắc quan trọng cần nhớ:

  1. Luôn xử lý lỗi một cách cụ thể: Phân loại và xử lý từng loại lỗi riêng biệt thay vì bắt tất cả các lỗi cùng một cách.
  2. Ghi log có cấu trúc và chi tiết: Bao gồm thông tin về thời gian, ngữ cảnh, truy vấn và tham số để dễ dàng phân tích.
  3. Sử dụng các cấp độ log phù hợp: DEBUG, INFO, WARNING, ERROR, CRITICAL cho từng loại thông tin khác nhau.
  4. Áp dụng log rotation: Ngăn chặn các file log quá lớn và khó quản lý.
  5. Tích hợp với hệ thống giám sát: Gửi thông báo khi có lỗi nghiêm trọng để xử lý kịp thời.
  6. Theo dõi hiệu suất truy vấn: Phát hiện và tối ưu các truy vấn chậm.

Với các kỹ thuật và công cụ được trình bày trong bài viết này, bạn có thể xây dựng một hệ thống logging và xử lý lỗi toàn diện, giúp ứng dụng của bạn trở nên ổn định, dễ bảo trì và hiệu quả hơn khi làm việc với SQL Server từ Python.